1-module(rabbit_confirms).
2
3-compile({no_auto_import, [size/1]}).
4
5-include_lib("rabbit_common/include/rabbit.hrl").
6
7-export([init/0,
8         insert/4,
9         confirm/3,
10         reject/2,
11
12         remove_queue/2,
13
14         smallest/1,
15         size/1,
16         is_empty/1]).
17
18-type seq_no() :: non_neg_integer().
19-type queue_name() :: rabbit_amqqueue:name().
20-type exchange_name() :: rabbit_exchange:name().
21
22-record(?MODULE, {smallest  :: undefined | seq_no(),
23                  unconfirmed = #{} :: #{seq_no() =>
24                                         {exchange_name(),
25                                          #{queue_name() => ok}}}
26                  }).
27
28-type mx() :: {seq_no(), exchange_name()}.
29
30-opaque state() :: #?MODULE{}.
31
32-export_type([
33              state/0
34              ]).
35
36-spec init() -> state().
37init() ->
38    #?MODULE{}.
39
40-spec insert(seq_no(), [queue_name()], exchange_name(), state()) ->
41    state().
42insert(SeqNo, QNames, #resource{kind = exchange} = XName,
43       #?MODULE{smallest = S0,
44                unconfirmed = U0} = State)
45  when is_integer(SeqNo)
46       andalso is_list(QNames)
47       andalso is_map_key(SeqNo, U0) == false ->
48    U = U0#{SeqNo => {XName, maps:from_list([{Q, ok} || Q <- QNames])}},
49    S = case S0 of
50            undefined -> SeqNo;
51            _ -> S0
52        end,
53    State#?MODULE{smallest = S,
54                  unconfirmed = U}.
55
56-spec confirm([seq_no()], queue_name(), state()) ->
57    {[mx()], state()}.
58confirm(SeqNos, QName, #?MODULE{smallest = Smallest0,
59                                unconfirmed = U0} = State)
60  when is_list(SeqNos) ->
61    {Confirmed, U} = lists:foldr(
62                       fun (SeqNo, Acc) ->
63                               confirm_one(SeqNo, QName, Acc)
64                       end, {[], U0}, SeqNos),
65    %% check if smallest is in Confirmed
66    %% TODO: this can be optimised by checking in the preceeding foldr
67    Smallest =
68    case lists:any(fun ({S, _}) -> S == Smallest0 end, Confirmed) of
69        true ->
70            %% work out new smallest
71            next_smallest(Smallest0, U);
72        false ->
73            Smallest0
74    end,
75    {Confirmed, State#?MODULE{smallest = Smallest,
76                              unconfirmed = U}}.
77
78-spec reject(seq_no(), state()) ->
79    {ok, mx(), state()} | {error, not_found}.
80reject(SeqNo, #?MODULE{smallest = Smallest0,
81                       unconfirmed = U0} = State)
82  when is_integer(SeqNo) ->
83    case maps:take(SeqNo, U0) of
84        {{XName, _QS}, U} ->
85            Smallest = case SeqNo of
86                           Smallest0 ->
87                               %% need to scan as the smallest was removed
88                               next_smallest(Smallest0, U);
89                           _ ->
90                               Smallest0
91                       end,
92            {ok, {SeqNo, XName}, State#?MODULE{unconfirmed = U,
93                                               smallest = Smallest}};
94        error ->
95            {error, not_found}
96    end.
97
98%% idempotent
99-spec remove_queue(queue_name(), state()) ->
100    {[mx()], state()}.
101remove_queue(QName, #?MODULE{unconfirmed = U} = State) ->
102    SeqNos = maps:fold(
103               fun (SeqNo, {_XName, QS0}, Acc) ->
104                       case maps:is_key(QName, QS0) of
105                           true ->
106                               [SeqNo | Acc];
107                           false ->
108                               Acc
109                       end
110               end, [], U),
111    confirm(lists:sort(SeqNos), QName,State).
112
113-spec smallest(state()) -> seq_no() | undefined.
114smallest(#?MODULE{smallest = Smallest}) ->
115    Smallest.
116
117-spec size(state()) -> non_neg_integer().
118size(#?MODULE{unconfirmed = U}) ->
119    maps:size(U).
120
121-spec is_empty(state()) -> boolean().
122is_empty(State) ->
123    size(State) == 0.
124
125%% INTERNAL
126
127confirm_one(SeqNo, QName, {Acc, U0}) ->
128    case maps:take(SeqNo, U0) of
129        {{XName, QS}, U1}
130          when is_map_key(QName, QS)
131               andalso map_size(QS) == 1 ->
132            %% last queue confirm
133            {[{SeqNo, XName} | Acc], U1};
134        {{XName, QS}, U1} ->
135            {Acc, U1#{SeqNo => {XName, maps:remove(QName, QS)}}};
136        error ->
137            {Acc, U0}
138    end.
139
140next_smallest(_S, U) when map_size(U) == 0 ->
141    undefined;
142next_smallest(S, U) when is_map_key(S, U) ->
143    S;
144next_smallest(S, U) ->
145    %% TODO: this is potentially infinitely recursive if called incorrectly
146    next_smallest(S+1, U).
147
148
149
150-ifdef(TEST).
151-include_lib("eunit/include/eunit.hrl").
152-endif.
153