1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2018-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(amqqueue_v1).
9
10-include_lib("rabbit_common/include/resource.hrl").
11-include("amqqueue.hrl").
12
13-export([new/8,
14         new/9,
15         new_with_version/9,
16         new_with_version/10,
17         fields/0,
18         fields/1,
19         field_vhost/0,
20         record_version_to_use/0,
21         upgrade/1,
22         upgrade_to/2,
23         % arguments
24         get_arguments/1,
25         set_arguments/2,
26         % decorators
27         get_decorators/1,
28         set_decorators/2,
29         % exclusive_owner
30         get_exclusive_owner/1,
31         % gm_pids
32         get_gm_pids/1,
33         set_gm_pids/2,
34         get_leader/1,
35         % name (#resource)
36         get_name/1,
37         set_name/2,
38         % operator_policy
39         get_operator_policy/1,
40         set_operator_policy/2,
41         % options
42         get_options/1,
43         set_options/2,
44         % pid
45         get_pid/1,
46         set_pid/2,
47         % policy
48         get_policy/1,
49         set_policy/2,
50         % policy_version
51         get_policy_version/1,
52         set_policy_version/2,
53         % type_state
54         get_type_state/1,
55         set_type_state/2,
56         % recoverable_slaves
57         get_recoverable_slaves/1,
58         set_recoverable_slaves/2,
59         % slave_pids
60         get_slave_pids/1,
61         set_slave_pids/2,
62         % slave_pids_pending_shutdown
63         get_slave_pids_pending_shutdown/1,
64         set_slave_pids_pending_shutdown/2,
65         % state
66         get_state/1,
67         set_state/2,
68         % sync_slave_pids
69         get_sync_slave_pids/1,
70         set_sync_slave_pids/2,
71         get_type/1,
72         get_vhost/1,
73         is_amqqueue/1,
74         is_auto_delete/1,
75         is_durable/1,
76         is_classic/1,
77         is_quorum/1,
78         pattern_match_all/0,
79         pattern_match_on_name/1,
80         pattern_match_on_type/1,
81         reset_mirroring_and_decorators/1,
82         set_immutable/1,
83         qnode/1,
84         macros/0]).
85
86-define(record_version, ?MODULE).
87-define(is_backwards_compat_classic(T),
88        (T =:= classic orelse T =:= ?amqqueue_v1_type)).
89
90-record(amqqueue, {
91          name :: rabbit_amqqueue:name() | '_', %% immutable
92          durable :: boolean() | '_',           %% immutable
93          auto_delete :: boolean() | '_',       %% immutable
94          exclusive_owner = none :: pid() | none | '_', %% immutable
95          arguments = [] :: rabbit_framing:amqp_table() | '_', %% immutable
96          pid :: pid() | none | '_',            %% durable (just so we
97                                                %% know home node)
98          slave_pids = [] :: [pid()] | none | '_',    %% transient
99          sync_slave_pids = [] :: [pid()] | none| '_',%% transient
100          recoverable_slaves = [] :: [atom()] | none | '_', %% durable
101          policy :: binary() | none | undefined | '_', %% durable, implicit
102                                                       %% update as above
103          operator_policy :: binary() | none | undefined | '_', %% durable,
104                                                                %% implicit
105                                                                %% update
106                                                                %% as above
107          gm_pids = [] :: [{pid(), pid()}] | none | '_', %% transient
108          decorators :: [atom()] | none | undefined | '_', %% transient,
109                                                          %% recalculated
110                                                          %% as above
111          state = live :: atom() | none | '_', %% durable (have we crashed?)
112          policy_version = 0 :: non_neg_integer() | '_',
113          slave_pids_pending_shutdown = [] :: [pid()] | '_',
114          vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
115          options = #{} :: map() | '_'
116         }).
117
118-type amqqueue() :: amqqueue_v1().
119-type amqqueue_v1() :: #amqqueue{
120                          name :: rabbit_amqqueue:name(),
121                          durable :: boolean(),
122                          auto_delete :: boolean(),
123                          exclusive_owner :: pid() | none,
124                          arguments :: rabbit_framing:amqp_table(),
125                          pid :: pid() | none,
126                          slave_pids :: [pid()] | none,
127                          sync_slave_pids :: [pid()] | none,
128                          recoverable_slaves :: [atom()] | none,
129                          policy :: binary() | none | undefined,
130                          operator_policy :: binary() | none | undefined,
131                          gm_pids :: [{pid(), pid()}] | none,
132                          decorators :: [atom()] | none | undefined,
133                          state :: atom() | none,
134                          policy_version :: non_neg_integer(),
135                          slave_pids_pending_shutdown :: [pid()],
136                          vhost :: rabbit_types:vhost() | undefined,
137                          options :: map()
138                         }.
139
140-type amqqueue_pattern() :: amqqueue_v1_pattern().
141-type amqqueue_v1_pattern() :: #amqqueue{
142                                  name :: rabbit_amqqueue:name() | '_',
143                                  durable :: '_',
144                                  auto_delete :: '_',
145                                  exclusive_owner :: '_',
146                                  arguments :: '_',
147                                  pid :: '_',
148                                  slave_pids :: '_',
149                                  sync_slave_pids :: '_',
150                                  recoverable_slaves :: '_',
151                                  policy :: '_',
152                                  operator_policy :: '_',
153                                  gm_pids :: '_',
154                                  decorators :: '_',
155                                  state :: '_',
156                                  policy_version :: '_',
157                                  slave_pids_pending_shutdown :: '_',
158                                  vhost :: '_',
159                                  options :: '_'
160                                 }.
161
162-export_type([amqqueue/0,
163              amqqueue_v1/0,
164              amqqueue_pattern/0,
165              amqqueue_v1_pattern/0]).
166
167-spec new(rabbit_amqqueue:name(),
168          pid() | none,
169          boolean(),
170          boolean(),
171          pid() | none,
172          rabbit_framing:amqp_table(),
173          rabbit_types:vhost() | undefined,
174          map()) -> amqqueue().
175
176new(#resource{kind = queue} = Name,
177    Pid,
178    Durable,
179    AutoDelete,
180    Owner,
181    Args,
182    VHost,
183    Options)
184  when (is_pid(Pid) orelse Pid =:= none) andalso
185       is_boolean(Durable) andalso
186       is_boolean(AutoDelete) andalso
187       (is_pid(Owner) orelse Owner =:= none) andalso
188       is_list(Args) andalso
189       (is_binary(VHost) orelse VHost =:= undefined) andalso
190       is_map(Options) ->
191    new_with_version(
192      ?record_version,
193      Name,
194      Pid,
195      Durable,
196      AutoDelete,
197      Owner,
198      Args,
199      VHost,
200      Options).
201
202-spec new(rabbit_amqqueue:name(),
203          pid() | none,
204          boolean(),
205          boolean(),
206          pid() | none,
207          rabbit_framing:amqp_table(),
208          rabbit_types:vhost() | undefined,
209          map(),
210          ?amqqueue_v1_type | classic) -> amqqueue().
211
212new(#resource{kind = queue} = Name,
213    Pid,
214    Durable,
215    AutoDelete,
216    Owner,
217    Args,
218    VHost,
219    Options,
220    Type)
221  when (is_pid(Pid) orelse Pid =:= none) andalso
222       is_boolean(Durable) andalso
223       is_boolean(AutoDelete) andalso
224       (is_pid(Owner) orelse Owner =:= none) andalso
225       is_list(Args) andalso
226       (is_binary(VHost) orelse VHost =:= undefined) andalso
227       is_map(Options) andalso
228       ?is_backwards_compat_classic(Type) ->
229    new(
230      Name,
231      Pid,
232      Durable,
233      AutoDelete,
234      Owner,
235      Args,
236      VHost,
237      Options).
238
239-spec new_with_version(amqqueue_v1,
240                       rabbit_amqqueue:name(),
241                       pid() | none,
242                       boolean(),
243                       boolean(),
244                       pid() | none,
245                       rabbit_framing:amqp_table(),
246                       rabbit_types:vhost() | undefined,
247                       map()) -> amqqueue().
248
249new_with_version(?record_version,
250                 #resource{kind = queue} = Name,
251                 Pid,
252                 Durable,
253                 AutoDelete,
254                 Owner,
255                 Args,
256                 VHost,
257                 Options)
258  when (is_pid(Pid) orelse Pid =:= none) andalso
259       is_boolean(Durable) andalso
260       is_boolean(AutoDelete) andalso
261       (is_pid(Owner) orelse Owner =:= none) andalso
262       is_list(Args) andalso
263       (is_binary(VHost) orelse VHost =:= undefined) andalso
264       is_map(Options) ->
265    #amqqueue{name            = Name,
266              durable         = Durable,
267              auto_delete     = AutoDelete,
268              arguments       = Args,
269              exclusive_owner = Owner,
270              pid             = Pid,
271              vhost           = VHost,
272              options         = Options}.
273
274-spec new_with_version(amqqueue_v1,
275                       rabbit_amqqueue:name(),
276                       pid() | none,
277                       boolean(),
278                       boolean(),
279                       pid() | none,
280                       rabbit_framing:amqp_table(),
281                       rabbit_types:vhost() | undefined,
282                       map(),
283                       ?amqqueue_v1_type | classic) -> amqqueue().
284
285new_with_version(?record_version,
286                 #resource{kind = queue} = Name,
287                 Pid,
288                 Durable,
289                 AutoDelete,
290                 Owner,
291                 Args,
292                 VHost,
293                 Options,
294                 Type)
295  when (is_pid(Pid) orelse Pid =:= none) andalso
296       is_boolean(Durable) andalso
297       is_boolean(AutoDelete) andalso
298       (is_pid(Owner) orelse Owner =:= none) andalso
299       is_list(Args) andalso
300       (is_binary(VHost) orelse VHost =:= undefined) andalso
301       is_map(Options) andalso
302       ?is_backwards_compat_classic(Type) ->
303    new_with_version(
304      ?record_version,
305      Name,
306      Pid,
307      Durable,
308      AutoDelete,
309      Owner,
310      Args,
311      VHost,
312      Options).
313
314-spec is_amqqueue(any()) -> boolean().
315
316is_amqqueue(#amqqueue{}) -> true;
317is_amqqueue(_)           -> false.
318
319-spec record_version_to_use() -> amqqueue_v1.
320
321record_version_to_use() ->
322    ?record_version.
323
324-spec upgrade(amqqueue()) -> amqqueue().
325
326upgrade(#amqqueue{} = Queue) -> Queue.
327
328-spec upgrade_to(amqqueue_v1, amqqueue()) -> amqqueue().
329
330upgrade_to(?record_version, #amqqueue{} = Queue) ->
331    Queue.
332
333% arguments
334
335-spec get_arguments(amqqueue()) -> rabbit_framing:amqp_table().
336
337get_arguments(#amqqueue{arguments = Args}) -> Args.
338
339-spec set_arguments(amqqueue(), rabbit_framing:amqp_table()) -> amqqueue().
340
341set_arguments(#amqqueue{} = Queue, Args) ->
342    Queue#amqqueue{arguments = Args}.
343
344% decorators
345
346-spec get_decorators(amqqueue()) -> [atom()] | none | undefined.
347
348get_decorators(#amqqueue{decorators = Decorators}) -> Decorators.
349
350-spec set_decorators(amqqueue(), [atom()] | none | undefined) -> amqqueue().
351
352set_decorators(#amqqueue{} = Queue, Decorators) ->
353    Queue#amqqueue{decorators = Decorators}.
354
355-spec get_exclusive_owner(amqqueue()) -> pid() | none.
356
357get_exclusive_owner(#amqqueue{exclusive_owner = Owner}) -> Owner.
358
359% gm_pids
360
361-spec get_gm_pids(amqqueue()) -> [{pid(), pid()}] | none.
362
363get_gm_pids(#amqqueue{gm_pids = GMPids}) -> GMPids.
364
365-spec set_gm_pids(amqqueue(), [{pid(), pid()}] | none) -> amqqueue().
366
367set_gm_pids(#amqqueue{} = Queue, GMPids) ->
368    Queue#amqqueue{gm_pids = GMPids}.
369
370-spec get_leader(amqqueue_v1()) -> no_return().
371
372get_leader(_) -> throw({unsupported, ?record_version, get_leader}).
373
374% operator_policy
375
376-spec get_operator_policy(amqqueue()) -> binary() | none | undefined.
377
378get_operator_policy(#amqqueue{operator_policy = OpPolicy}) -> OpPolicy.
379
380-spec set_operator_policy(amqqueue(), binary() | none | undefined) ->
381    amqqueue().
382
383set_operator_policy(#amqqueue{} = Queue, OpPolicy) ->
384    Queue#amqqueue{operator_policy = OpPolicy}.
385
386% name
387
388-spec get_name(amqqueue()) -> rabbit_amqqueue:name().
389
390get_name(#amqqueue{name = Name}) -> Name.
391
392-spec set_name(amqqueue(), rabbit_amqqueue:name()) -> amqqueue().
393
394set_name(#amqqueue{} = Queue, Name) ->
395    Queue#amqqueue{name = Name}.
396
397%% options
398
399-spec get_options(amqqueue()) -> map().
400
401get_options(#amqqueue{options = Options}) -> Options.
402
403-spec set_options(amqqueue(), map()) -> amqqueue().
404
405set_options(#amqqueue{} = Queue, Options) ->
406    Queue#amqqueue{options = Options}.
407
408% pid
409
410-spec get_pid
411(amqqueue_v1:amqqueue_v1()) -> pid() | none.
412
413get_pid(#amqqueue{pid = Pid}) -> Pid.
414
415-spec set_pid
416(amqqueue_v1:amqqueue_v1(), pid() | none) -> amqqueue_v1:amqqueue_v1().
417
418set_pid(#amqqueue{} = Queue, Pid) ->
419    Queue#amqqueue{pid = Pid}.
420
421% policy
422
423-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined.
424
425get_policy(#amqqueue{policy = Policy}) -> Policy.
426
427-spec set_policy(amqqueue(), binary() | none | undefined) -> amqqueue().
428
429set_policy(#amqqueue{} = Queue, Policy) ->
430    Queue#amqqueue{policy = Policy}.
431
432% policy_version
433
434-spec get_policy_version(amqqueue()) -> non_neg_integer().
435
436get_policy_version(#amqqueue{policy_version = PV}) ->
437    PV.
438
439-spec set_policy_version(amqqueue(), non_neg_integer()) -> amqqueue().
440
441set_policy_version(#amqqueue{} = Queue, PV) ->
442    Queue#amqqueue{policy_version = PV}.
443
444% recoverable_slaves
445
446-spec get_recoverable_slaves(amqqueue()) -> [atom()] | none.
447
448get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) ->
449    Slaves.
450
451-spec set_recoverable_slaves(amqqueue(), [atom()] | none) -> amqqueue().
452
453set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
454    Queue#amqqueue{recoverable_slaves = Slaves}.
455
456% type_state (new in v2)
457
458-spec get_type_state(amqqueue()) -> no_return().
459
460get_type_state(_) -> throw({unsupported, ?record_version, get_type_state}).
461
462-spec set_type_state(amqqueue(), [node()]) -> no_return().
463
464set_type_state(_, _) ->
465    throw({unsupported, ?record_version, set_type_state}).
466
467% slave_pids
468
469get_slave_pids(#amqqueue{slave_pids = Slaves}) ->
470    Slaves.
471
472set_slave_pids(#amqqueue{} = Queue, SlavePids) ->
473    Queue#amqqueue{slave_pids = SlavePids}.
474
475% slave_pids_pending_shutdown
476
477get_slave_pids_pending_shutdown(
478  #amqqueue{slave_pids_pending_shutdown = Slaves}) ->
479    Slaves.
480
481set_slave_pids_pending_shutdown(#amqqueue{} = Queue, SlavePids) ->
482    Queue#amqqueue{slave_pids_pending_shutdown = SlavePids}.
483
484% state
485
486-spec get_state(amqqueue()) -> atom() | none.
487
488get_state(#amqqueue{state = State}) -> State.
489
490-spec set_state(amqqueue(), atom() | none) -> amqqueue().
491
492set_state(#amqqueue{} = Queue, State) ->
493    Queue#amqqueue{state = State}.
494
495% sync_slave_pids
496
497-spec get_sync_slave_pids(amqqueue()) -> [pid()] | none.
498
499get_sync_slave_pids(#amqqueue{sync_slave_pids = Pids}) ->
500    Pids.
501
502-spec set_sync_slave_pids(amqqueue(), [pid()] | none) -> amqqueue().
503
504set_sync_slave_pids(#amqqueue{} = Queue, Pids) ->
505    Queue#amqqueue{sync_slave_pids = Pids}.
506
507%% New in v2.
508
509-spec get_type(amqqueue()) -> atom().
510
511get_type(Queue) when ?is_amqqueue(Queue) -> ?amqqueue_v1_type.
512
513-spec get_vhost(amqqueue()) -> rabbit_types:vhost() | undefined.
514
515get_vhost(#amqqueue{vhost = VHost}) -> VHost.
516
517-spec is_auto_delete(amqqueue()) -> boolean().
518
519is_auto_delete(#amqqueue{auto_delete = AutoDelete}) -> AutoDelete.
520
521-spec is_durable(amqqueue()) -> boolean().
522
523is_durable(#amqqueue{durable = Durable}) -> Durable.
524
525-spec is_classic(amqqueue()) -> boolean().
526
527is_classic(Queue) ->
528    get_type(Queue) =:= ?amqqueue_v1_type.
529
530-spec is_quorum(amqqueue()) -> boolean().
531
532is_quorum(Queue) when ?is_amqqueue(Queue) ->
533    false.
534
535fields() -> fields(?record_version).
536
537fields(?record_version) -> record_info(fields, amqqueue).
538
539field_vhost() -> #amqqueue.vhost.
540
541-spec pattern_match_all() -> amqqueue_pattern().
542
543pattern_match_all() -> #amqqueue{_ = '_'}.
544
545-spec pattern_match_on_name(rabbit_amqqueue:name()) ->
546    amqqueue_pattern().
547
548pattern_match_on_name(Name) -> #amqqueue{name = Name, _ = '_'}.
549
550-spec pattern_match_on_type(atom()) -> no_return().
551
552pattern_match_on_type(_) ->
553    throw({unsupported, ?record_version, pattern_match_on_type}).
554
555reset_mirroring_and_decorators(#amqqueue{} = Queue) ->
556    Queue#amqqueue{slave_pids      = [],
557                   sync_slave_pids = [],
558                   gm_pids         = [],
559                   decorators      = undefined}.
560
561set_immutable(#amqqueue{} = Queue) ->
562    Queue#amqqueue{pid                = none,
563                   slave_pids         = none,
564                   sync_slave_pids    = none,
565                   recoverable_slaves = none,
566                   gm_pids            = none,
567                   policy             = none,
568                   decorators         = none,
569                   state              = none}.
570
571-spec qnode(amqqueue() | pid()) -> node().
572
573qnode(Queue) when ?is_amqqueue(Queue) ->
574    QPid = get_pid(Queue),
575    qnode(QPid);
576qnode(QPid) when is_pid(QPid) ->
577    node(QPid).
578
579macros() ->
580    io:format(
581      "-define(is_~s(Q), is_record(Q, amqqueue, ~b)).~n~n",
582      [?record_version, record_info(size, amqqueue)]),
583    %% The field number starts at 2 because the first element is the
584    %% record name.
585    macros(record_info(fields, amqqueue), 2).
586
587macros([Field | Rest], I) ->
588    io:format(
589      "-define(~s_field_~s(Q), element(~b, Q)).~n",
590      [?record_version, Field, I]),
591    macros(Rest, I + 1);
592macros([], _) ->
593    ok.
594