1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(amqqueue). %% Could become amqqueue_v2 in the future.
9
10-include_lib("rabbit_common/include/rabbit.hrl").
11-include("amqqueue.hrl").
12
13-export([new/8,
14         new/9,
15         new_with_version/9,
16         new_with_version/10,
17         fields/0,
18         fields/1,
19         field_vhost/0,
20         record_version_to_use/0,
21         upgrade/1,
22         upgrade_to/2,
23         % arguments
24         get_arguments/1,
25         set_arguments/2,
26         % decorators
27         get_decorators/1,
28         set_decorators/2,
29         % exclusive_owner
30         get_exclusive_owner/1,
31         % gm_pids
32         get_gm_pids/1,
33         set_gm_pids/2,
34         get_leader/1,
35         % name (#resource)
36         get_name/1,
37         set_name/2,
38         % operator_policy
39         get_operator_policy/1,
40         set_operator_policy/2,
41         % options
42         get_options/1,
43         set_options/2,
44         % pid
45         get_pid/1,
46         set_pid/2,
47         % policy
48         get_policy/1,
49         set_policy/2,
50         % policy_version
51         get_policy_version/1,
52         set_policy_version/2,
53         % type_state
54         get_type_state/1,
55         set_type_state/2,
56         % recoverable_slaves
57         get_recoverable_slaves/1,
58         set_recoverable_slaves/2,
59         % slave_pids
60         get_slave_pids/1,
61         set_slave_pids/2,
62         % slave_pids_pending_shutdown
63         get_slave_pids_pending_shutdown/1,
64         set_slave_pids_pending_shutdown/2,
65         % state
66         get_state/1,
67         set_state/2,
68         % sync_slave_pids
69         get_sync_slave_pids/1,
70         set_sync_slave_pids/2,
71         get_type/1,
72         get_vhost/1,
73         is_amqqueue/1,
74         is_auto_delete/1,
75         is_durable/1,
76         is_classic/1,
77         is_quorum/1,
78         pattern_match_all/0,
79         pattern_match_on_name/1,
80         pattern_match_on_type/1,
81         reset_mirroring_and_decorators/1,
82         set_immutable/1,
83         qnode/1,
84         macros/0]).
85
86-define(record_version, amqqueue_v2).
87-define(is_backwards_compat_classic(T),
88        (T =:= classic orelse T =:= ?amqqueue_v1_type)).
89
90-record(amqqueue, {
91          name :: rabbit_amqqueue:name() | '_', %% immutable
92          durable :: boolean() | '_',           %% immutable
93          auto_delete :: boolean() | '_',       %% immutable
94          exclusive_owner = none :: pid() | none | '_', %% immutable
95          arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable
96          pid :: pid() | ra_server_id() | none | '_', %% durable (just so we
97                                                      %% know home node)
98          slave_pids = [] :: [pid()] | none | '_',    %% transient
99          sync_slave_pids = [] :: [pid()] | none| '_',%% transient
100          recoverable_slaves = [] :: [atom()] | none | '_', %% durable
101          policy :: binary() | none | undefined | '_', %% durable, implicit
102                                                       %% update as above
103          operator_policy :: binary() | none | undefined | '_', %% durable,
104                                                                %% implicit
105                                                                %% update
106                                                                %% as above
107          gm_pids = [] :: [{pid(), pid()}] | none | '_', %% transient
108          decorators :: [atom()] | none | undefined | '_', %% transient,
109                                                          %% recalculated
110                                                          %% as above
111          state = live :: atom() | none | '_', %% durable (have we crashed?)
112          policy_version = 0 :: non_neg_integer() | '_',
113          slave_pids_pending_shutdown = [] :: [pid()] | '_',
114          vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
115          options = #{} :: map() | '_',
116          type = ?amqqueue_v1_type :: module() | '_',
117          type_state = #{} :: map() | '_'
118         }).
119
120-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2().
121-type amqqueue_v2() :: #amqqueue{
122                          name :: rabbit_amqqueue:name(),
123                          durable :: boolean(),
124                          auto_delete :: boolean(),
125                          exclusive_owner :: pid() | none,
126                          arguments :: rabbit_framing:amqp_table(),
127                          pid :: pid() | ra_server_id() | none,
128                          slave_pids :: [pid()] | none,
129                          sync_slave_pids :: [pid()] | none,
130                          recoverable_slaves :: [atom()] | none,
131                          policy :: binary() | none | undefined,
132                          operator_policy :: binary() | none | undefined,
133                          gm_pids :: [{pid(), pid()}] | none,
134                          decorators :: [atom()] | none | undefined,
135                          state :: atom() | none,
136                          policy_version :: non_neg_integer(),
137                          slave_pids_pending_shutdown :: [pid()],
138                          vhost :: rabbit_types:vhost() | undefined,
139                          options :: map(),
140                          type :: atom(),
141                          type_state :: #{}
142                         }.
143
144-type ra_server_id() :: {Name :: atom(), Node :: node()}.
145
146-type amqqueue_pattern() :: amqqueue_v1:amqqueue_v1_pattern() |
147                            amqqueue_v2_pattern().
148-type amqqueue_v2_pattern() :: #amqqueue{
149                                  name :: rabbit_amqqueue:name() | '_',
150                                  durable :: '_',
151                                  auto_delete :: '_',
152                                  exclusive_owner :: '_',
153                                  arguments :: '_',
154                                  pid :: '_',
155                                  slave_pids :: '_',
156                                  sync_slave_pids :: '_',
157                                  recoverable_slaves :: '_',
158                                  policy :: '_',
159                                  operator_policy :: '_',
160                                  gm_pids :: '_',
161                                  decorators :: '_',
162                                  state :: '_',
163                                  policy_version :: '_',
164                                  slave_pids_pending_shutdown :: '_',
165                                  vhost :: '_',
166                                  options :: '_',
167                                  type :: atom() | '_',
168                                  type_state :: '_'
169                                 }.
170
171-export_type([amqqueue/0,
172              amqqueue_v2/0,
173              amqqueue_pattern/0,
174              amqqueue_v2_pattern/0,
175              ra_server_id/0]).
176
177-spec new(rabbit_amqqueue:name(),
178          pid() | ra_server_id() | none,
179          boolean(),
180          boolean(),
181          pid() | none,
182          rabbit_framing:amqp_table(),
183          rabbit_types:vhost() | undefined,
184          map()) -> amqqueue().
185
186new(#resource{kind = queue} = Name,
187    Pid,
188    Durable,
189    AutoDelete,
190    Owner,
191    Args,
192    VHost,
193    Options)
194  when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso
195       is_boolean(Durable) andalso
196       is_boolean(AutoDelete) andalso
197       (is_pid(Owner) orelse Owner =:= none) andalso
198       is_list(Args) andalso
199       (is_binary(VHost) orelse VHost =:= undefined) andalso
200       is_map(Options) ->
201    new(Name,
202        Pid,
203        Durable,
204        AutoDelete,
205        Owner,
206        Args,
207        VHost,
208        Options,
209        ?amqqueue_v1_type).
210
211-spec new(rabbit_amqqueue:name(),
212          pid() | ra_server_id() | none,
213          boolean(),
214          boolean(),
215          pid() | none,
216          rabbit_framing:amqp_table(),
217          rabbit_types:vhost() | undefined,
218          map(),
219          atom()) -> amqqueue().
220
221new(#resource{kind = queue} = Name,
222    Pid,
223    Durable,
224    AutoDelete,
225    Owner,
226    Args,
227    VHost,
228    Options,
229    Type)
230  when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso
231       is_boolean(Durable) andalso
232       is_boolean(AutoDelete) andalso
233       (is_pid(Owner) orelse Owner =:= none) andalso
234       is_list(Args) andalso
235       (is_binary(VHost) orelse VHost =:= undefined) andalso
236       is_map(Options) andalso
237       is_atom(Type) ->
238    case record_version_to_use() of
239        ?record_version ->
240            new_with_version(
241              ?record_version,
242              Name,
243              Pid,
244              Durable,
245              AutoDelete,
246              Owner,
247              Args,
248              VHost,
249              Options,
250              Type);
251        _ ->
252            amqqueue_v1:new(
253              Name,
254              Pid,
255              Durable,
256              AutoDelete,
257              Owner,
258              Args,
259              VHost,
260              Options,
261              Type)
262    end.
263
264-spec new_with_version
265(amqqueue_v1 | amqqueue_v2,
266 rabbit_amqqueue:name(),
267 pid() | ra_server_id() | none,
268 boolean(),
269 boolean(),
270 pid() | none,
271 rabbit_framing:amqp_table(),
272 rabbit_types:vhost() | undefined,
273 map()) -> amqqueue().
274
275new_with_version(RecordVersion,
276                 #resource{kind = queue} = Name,
277                 Pid,
278                 Durable,
279                 AutoDelete,
280                 Owner,
281                 Args,
282                 VHost,
283                 Options)
284  when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso
285       is_boolean(Durable) andalso
286       is_boolean(AutoDelete) andalso
287       (is_pid(Owner) orelse Owner =:= none) andalso
288       is_list(Args) andalso
289       (is_binary(VHost) orelse VHost =:= undefined) andalso
290       is_map(Options) ->
291    new_with_version(RecordVersion,
292                     Name,
293                     Pid,
294                     Durable,
295                     AutoDelete,
296                     Owner,
297                     Args,
298                     VHost,
299                     Options,
300                     ?amqqueue_v1_type).
301
302-spec new_with_version
303(amqqueue_v1 | amqqueue_v2,
304 rabbit_amqqueue:name(),
305 pid() | ra_server_id() | none,
306 boolean(),
307 boolean(),
308 pid() | none,
309 rabbit_framing:amqp_table(),
310 rabbit_types:vhost() | undefined,
311 map(),
312 atom()) -> amqqueue().
313
314new_with_version(?record_version,
315                 #resource{kind = queue} = Name,
316                 Pid,
317                 Durable,
318                 AutoDelete,
319                 Owner,
320                 Args,
321                 VHost,
322                 Options,
323                 Type)
324  when (is_pid(Pid) orelse is_tuple(Pid) orelse Pid =:= none) andalso
325       is_boolean(Durable) andalso
326       is_boolean(AutoDelete) andalso
327       (is_pid(Owner) orelse Owner =:= none) andalso
328       is_list(Args) andalso
329       (is_binary(VHost) orelse VHost =:= undefined) andalso
330       is_map(Options) andalso
331       is_atom(Type) ->
332    #amqqueue{name            = Name,
333              durable         = Durable,
334              auto_delete     = AutoDelete,
335              arguments       = Args,
336              exclusive_owner = Owner,
337              pid             = Pid,
338              vhost           = VHost,
339              options         = Options,
340              type            = ensure_type_compat(Type)};
341new_with_version(Version,
342                 Name,
343                 Pid,
344                 Durable,
345                 AutoDelete,
346                 Owner,
347                 Args,
348                 VHost,
349                 Options,
350                 Type)
351  when ?is_backwards_compat_classic(Type) ->
352    amqqueue_v1:new_with_version(
353      Version,
354      Name,
355      Pid,
356      Durable,
357      AutoDelete,
358      Owner,
359      Args,
360      VHost,
361      Options).
362
363-spec is_amqqueue(any()) -> boolean().
364
365is_amqqueue(#amqqueue{}) -> true;
366is_amqqueue(Queue)       -> amqqueue_v1:is_amqqueue(Queue).
367
368-spec record_version_to_use() -> amqqueue_v1 | amqqueue_v2.
369
370record_version_to_use() ->
371    case rabbit_feature_flags:is_enabled(quorum_queue) of
372        true  -> ?record_version;
373        false -> amqqueue_v1:record_version_to_use()
374    end.
375
376-spec upgrade(amqqueue()) -> amqqueue().
377
378upgrade(#amqqueue{} = Queue) -> Queue;
379upgrade(OldQueue)            -> upgrade_to(record_version_to_use(), OldQueue).
380
381-spec upgrade_to
382(amqqueue_v2, amqqueue()) -> amqqueue_v2();
383(amqqueue_v1, amqqueue_v1:amqqueue_v1()) -> amqqueue_v1:amqqueue_v1().
384
385upgrade_to(?record_version, #amqqueue{} = Queue) ->
386    Queue;
387upgrade_to(?record_version, OldQueue) ->
388    Fields = erlang:tuple_to_list(OldQueue) ++ [?amqqueue_v1_type,
389                                                undefined],
390    #amqqueue{} = erlang:list_to_tuple(Fields);
391upgrade_to(Version, OldQueue) ->
392    amqqueue_v1:upgrade_to(Version, OldQueue).
393
394% arguments
395
396-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table().
397
398get_arguments(#amqqueue{arguments = Args}) ->
399    Args;
400get_arguments(Queue) ->
401    amqqueue_v1:get_arguments(Queue).
402
403-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue().
404
405set_arguments(#amqqueue{} = Queue, Args) ->
406    Queue#amqqueue{arguments = Args};
407set_arguments(Queue, Args) ->
408    amqqueue_v1:set_arguments(Queue, Args).
409
410% decorators
411
412-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
413
414get_decorators(#amqqueue{decorators = Decorators}) ->
415    Decorators;
416get_decorators(Queue) ->
417    amqqueue_v1:get_decorators(Queue).
418
419-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue().
420
421set_decorators(#amqqueue{} = Queue, Decorators) ->
422    Queue#amqqueue{decorators = Decorators};
423set_decorators(Queue, Decorators) ->
424    amqqueue_v1:set_decorators(Queue, Decorators).
425
426-spec get_exclusive_owner(amqqueue()) -> pid() | none.
427
428get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) ->
429    Owner;
430get_exclusive_owner(Queue) ->
431    amqqueue_v1:get_exclusive_owner(Queue).
432
433% gm_pids
434
435-spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none.
436
437get_gm_pids(#amqqueue{gm_pids = GMPids}) ->
438    GMPids;
439get_gm_pids(Queue) ->
440    amqqueue_v1:get_gm_pids(Queue).
441
442-spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue().
443
444set_gm_pids(#amqqueue{} = Queue, GMPids) ->
445    Queue#amqqueue{gm_pids = GMPids};
446set_gm_pids(Queue, GMPids) ->
447    amqqueue_v1:set_gm_pids(Queue, GMPids).
448
449-spec get_leader(amqqueue_v2()) -> node().
450
451get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
452
453% operator_policy
454
455-spec get_operator_policy(amqqueue()) -> binary() | none | undefined.
456
457get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy;
458get_operator_policy(Queue) -> amqqueue_v1:get_operator_policy(Queue).
459
460-spec set_operator_policy(amqqueue(), binary() | none | undefined) ->
461    amqqueue().
462
463set_operator_policy(#amqqueue{} = Queue, Policy) ->
464    Queue#amqqueue{operator_policy = Policy};
465set_operator_policy(Queue, Policy) ->
466    amqqueue_v1:set_operator_policy(Queue, Policy).
467
468% name
469
470-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
471
472get_name(#amqqueue{name = Name}) -> Name;
473get_name(Queue)                  -> amqqueue_v1:get_name(Queue).
474
475-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().
476
477set_name(#amqqueue{} = Queue, Name) ->
478    Queue#amqqueue{name = Name};
479set_name(Queue, Name) ->
480    amqqueue_v1:set_name(Queue, Name).
481
482-spec get_options(amqqueue()) -> map().
483
484get_options(#amqqueue{options = Options}) -> Options;
485get_options(Queue)                        -> amqqueue_v1:get_options(Queue).
486
487-spec set_options(amqqueue(), map()) -> amqqueue().
488
489set_options(#amqqueue{} = Queue, Options) ->
490    Queue#amqqueue{options = Options};
491set_options(Queue, Options) ->
492    amqqueue_v1:set_options(Queue, Options).
493
494% pid
495
496-spec get_pid
497(amqqueue_v2()) -> pid() | ra_server_id() | none;
498(amqqueue_v1:amqqueue_v1()) -> pid() | none.
499
500get_pid(#amqqueue{pid = Pid}) -> Pid;
501get_pid(Queue)                -> amqqueue_v1:get_pid(Queue).
502
503-spec set_pid
504(amqqueue_v2(), pid() | ra_server_id() | none) -> amqqueue_v2();
505(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1().
506
507set_pid(#amqqueue{} = Queue, Pid) ->
508    Queue#amqqueue{pid = Pid};
509set_pid(Queue, Pid) ->
510    amqqueue_v1:set_pid(Queue, Pid).
511
512% policy
513
514-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined.
515
516get_policy(#amqqueue{policy = Policy}) -> Policy;
517get_policy(Queue) -> amqqueue_v1:get_policy(Queue).
518
519-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue().
520
521set_policy(#amqqueue{} = Queue, Policy) ->
522    Queue#amqqueue{policy = Policy};
523set_policy(Queue, Policy) ->
524    amqqueue_v1:set_policy(Queue, Policy).
525
526% policy_version
527
528-spec get_policy_version(amqqueue()) -> non_neg_integer().
529
530get_policy_version(#amqqueue{policy_version = PV}) ->
531    PV;
532get_policy_version(Queue) ->
533    amqqueue_v1:get_policy_version(Queue).
534
535-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue().
536
537set_policy_version(#amqqueue{} = Queue, PV) ->
538    Queue#amqqueue{policy_version = PV};
539set_policy_version(Queue, PV) ->
540    amqqueue_v1:set_policy_version(Queue, PV).
541
542% recoverable_slaves
543
544-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none.
545
546get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) ->
547    Slaves;
548get_recoverable_slaves(Queue) ->
549    amqqueue_v1:get_recoverable_slaves(Queue).
550
551-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue().
552
553set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
554    Queue#amqqueue{recoverable_slaves = Slaves};
555set_recoverable_slaves(Queue, Slaves) ->
556    amqqueue_v1:set_recoverable_slaves(Queue, Slaves).
557
558% type_state (new in v2)
559
560-spec get_type_state(amqqueue()) -> map().
561get_type_state(#amqqueue{type_state = TState}) ->
562    TState;
563get_type_state(_) ->
564    #{}.
565
566-spec set_type_state(amqqueue(), map()) -> amqqueue().
567set_type_state(#amqqueue{} = Queue, TState) ->
568    Queue#amqqueue{type_state = TState};
569set_type_state(Queue, _TState) ->
570    Queue.
571
572% slave_pids
573
574-spec get_slave_pids(amqqueue()) -> [pid()] | none.
575
576get_slave_pids(#amqqueue{slave_pids = Slaves}) ->
577    Slaves;
578get_slave_pids(Queue) ->
579    amqqueue_v1:get_slave_pids(Queue).
580
581-spec set_slave_pids(amqqueue(), [pid()] | none) -> amqqueue().
582
583set_slave_pids(#amqqueue{} = Queue, SlavePids) ->
584    Queue#amqqueue{slave_pids = SlavePids};
585set_slave_pids(Queue, SlavePids) ->
586    amqqueue_v1:set_slave_pids(Queue, SlavePids).
587
588% slave_pids_pending_shutdown
589
590-spec get_slave_pids_pending_shutdown(amqqueue()) -> [pid()].
591
592get_slave_pids_pending_shutdown(
593  #amqqueue{slave_pids_pending_shutdown = Slaves}) ->
594    Slaves;
595get_slave_pids_pending_shutdown(Queue) ->
596    amqqueue_v1:get_slave_pids_pending_shutdown(Queue).
597
598-spec set_slave_pids_pending_shutdown(amqqueue(), [pid()]) -> amqqueue().
599
600set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) ->
601    Queue#amqqueue{slave_pids_pending_shutdown = SlavePids};
602set_slave_pids_pending_shutdown(Queue, SlavePids) ->
603    amqqueue_v1:set_slave_pids_pending_shutdown(Queue, SlavePids).
604
605% state
606
607-spec get_state(amqqueue()) -> atom() | none.
608
609get_state(#amqqueue{state = State}) -> State;
610get_state(Queue)                    -> amqqueue_v1:get_state(Queue).
611
612-spec set_state(amqqueue(), atom() | none) -> amqqueue().
613
614set_state(#amqqueue{} = Queue, State) ->
615    Queue#amqqueue{state = State};
616set_state(Queue, State) ->
617    amqqueue_v1:set_state(Queue, State).
618
619% sync_slave_pids
620
621-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none.
622
623get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) ->
624    Pids;
625get_sync_slave_pids(Queue) ->
626    amqqueue_v1:get_sync_slave_pids(Queue).
627
628-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue().
629
630set_sync_slave_pids(#amqqueue{} = Queue, Pids) ->
631    Queue#amqqueue{sync_slave_pids = Pids};
632set_sync_slave_pids(Queue, Pids) ->
633    amqqueue_v1:set_sync_slave_pids(Queue, Pids).
634
635%% New in v2.
636
637-spec get_type(amqqueue()) -> atom().
638
639get_type(#amqqueue{type = Type})         -> Type;
640get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type.
641
642-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.
643
644get_vhost(#amqqueue{vhost = VHost}) -> VHost;
645get_vhost(Queue)                    -> amqqueue_v1:get_vhost(Queue).
646
647-spec is_auto_delete(amqqueue()) -> boolean().
648
649is_auto_delete(#amqqueue{auto_delete = AutoDelete}) ->
650    AutoDelete;
651is_auto_delete(Queue) ->
652    amqqueue_v1:is_auto_delete(Queue).
653
654-spec is_durable(amqqueue()) -> boolean().
655
656is_durable(#amqqueue{durable = Durable}) -> Durable;
657is_durable(Queue)                        -> amqqueue_v1:is_durable(Queue).
658
659-spec is_classic(amqqueue()) -> boolean().
660
661is_classic(Queue) ->
662    get_type(Queue) =:= ?amqqueue_v1_type.
663
664-spec is_quorum(amqqueue()) -> boolean().
665
666is_quorum(Queue) ->
667    get_type(Queue) =:= rabbit_quorum_queue.
668
669fields() ->
670    case record_version_to_use() of
671        ?record_version -> fields(?record_version);
672        _               -> amqqueue_v1:fields()
673    end.
674
675fields(?record_version) -> record_info(fields, amqqueue);
676fields(Version)         -> amqqueue_v1:fields(Version).
677
678field_vhost() ->
679    case record_version_to_use() of
680        ?record_version -> #amqqueue.vhost;
681        _               -> amqqueue_v1:field_vhost()
682    end.
683
684-spec pattern_match_all() -> amqqueue_pattern().
685
686pattern_match_all() ->
687    case record_version_to_use() of
688        ?record_version -> #amqqueue{_ = '_'};
689        _               -> amqqueue_v1:pattern_match_all()
690    end.
691
692-spec pattern_match_on_name(rabbit_amqqueue:name()) -> amqqueue_pattern().
693
694pattern_match_on_name(Name) ->
695    case record_version_to_use() of
696        ?record_version -> #amqqueue{name = Name, _ = '_'};
697        _               -> amqqueue_v1:pattern_match_on_name(Name)
698    end.
699
700-spec pattern_match_on_type(atom()) -> amqqueue_pattern().
701
702pattern_match_on_type(Type) ->
703    case record_version_to_use() of
704        ?record_version ->
705            #amqqueue{type = Type, _ = '_'};
706        _ when ?is_backwards_compat_classic(Type) ->
707            amqqueue_v1:pattern_match_all();
708        %% FIXME: We try a pattern which should never match when the
709        %% `quorum_queue` feature flag is not enabled yet. Is there
710        %% a better solution?
711        _ ->
712            amqqueue_v1:pattern_match_on_name(
713              rabbit_misc:r(<<0>>, queue, <<0>>))
714    end.
715
716-spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue().
717
718reset_mirroring_and_decorators(#amqqueue{} = Queue) ->
719    Queue#amqqueue{slave_pids      = [],
720                   sync_slave_pids = [],
721                   gm_pids         = [],
722                   decorators      = undefined};
723reset_mirroring_and_decorators(Queue) ->
724    amqqueue_v1:reset_mirroring_and_decorators(Queue).
725
726-spec set_immutable(amqqueue()) -> amqqueue().
727
728set_immutable(#amqqueue{} = Queue) ->
729    Queue#amqqueue{pid                = none,
730                   slave_pids         = [],
731                   sync_slave_pids    = none,
732                   recoverable_slaves = none,
733                   gm_pids            = none,
734                   policy             = none,
735                   decorators         = none,
736                   state              = none};
737set_immutable(Queue) ->
738    amqqueue_v1:set_immutable(Queue).
739
740-spec qnode(amqqueue() | pid() | ra_server_id()) -> node().
741
742qnode(Queue) when ?is_amqqueue(Queue) ->
743    QPid = get_pid(Queue),
744    qnode(QPid);
745qnode(QPid) when is_pid(QPid) ->
746    node(QPid);
747qnode({_, Node}) ->
748    Node.
749
750% private
751
752macros() ->
753    io:format(
754      "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n",
755      [?record_version, record_info(size, amqqueue)]),
756    %% The field number starts at 2 because the first element is the
757    %% record name.
758    macros(record_info(fields, amqqueue), 2).
759
760macros([Field | Rest], I) ->
761    io:format(
762      "-define(~s_field_~s(Q), element(~b, Q)).~n",
763      [?record_version, Field, I]),
764    macros(Rest, I + 1);
765macros([], _) ->
766    ok.
767
768ensure_type_compat(classic) ->
769    ?amqqueue_v1_type;
770ensure_type_compat(Type) ->
771    Type.
772