1-module(rabbit_confirms). 2 3-compile({no_auto_import, [size/1]}). 4 5-include_lib("rabbit_common/include/rabbit.hrl"). 6 7-export([init/0, 8 insert/4, 9 confirm/3, 10 reject/2, 11 12 remove_queue/2, 13 14 smallest/1, 15 size/1, 16 is_empty/1]). 17 18-type seq_no() :: non_neg_integer(). 19-type queue_name() :: rabbit_amqqueue:name(). 20-type exchange_name() :: rabbit_exchange:name(). 21 22-record(?MODULE, {smallest :: undefined | seq_no(), 23 unconfirmed = #{} :: #{seq_no() => 24 {exchange_name(), 25 #{queue_name() => ok}}} 26 }). 27 28-type mx() :: {seq_no(), exchange_name()}. 29 30-opaque state() :: #?MODULE{}. 31 32-export_type([ 33 state/0 34 ]). 35 36-spec init() -> state(). 37init() -> 38 #?MODULE{}. 39 40-spec insert(seq_no(), [queue_name()], exchange_name(), state()) -> 41 state(). 42insert(SeqNo, QNames, #resource{kind = exchange} = XName, 43 #?MODULE{smallest = S0, 44 unconfirmed = U0} = State) 45 when is_integer(SeqNo) 46 andalso is_list(QNames) 47 andalso is_map_key(SeqNo, U0) == false -> 48 U = U0#{SeqNo => {XName, maps:from_list([{Q, ok} || Q <- QNames])}}, 49 S = case S0 of 50 undefined -> SeqNo; 51 _ -> S0 52 end, 53 State#?MODULE{smallest = S, 54 unconfirmed = U}. 55 56-spec confirm([seq_no()], queue_name(), state()) -> 57 {[mx()], state()}. 58confirm(SeqNos, QName, #?MODULE{smallest = Smallest0, 59 unconfirmed = U0} = State) 60 when is_list(SeqNos) -> 61 {Confirmed, U} = lists:foldr( 62 fun (SeqNo, Acc) -> 63 confirm_one(SeqNo, QName, Acc) 64 end, {[], U0}, SeqNos), 65 %% check if smallest is in Confirmed 66 %% TODO: this can be optimised by checking in the preceeding foldr 67 Smallest = 68 case lists:any(fun ({S, _}) -> S == Smallest0 end, Confirmed) of 69 true -> 70 %% work out new smallest 71 next_smallest(Smallest0, U); 72 false -> 73 Smallest0 74 end, 75 {Confirmed, State#?MODULE{smallest = Smallest, 76 unconfirmed = U}}. 77 78-spec reject(seq_no(), state()) -> 79 {ok, mx(), state()} | {error, not_found}. 80reject(SeqNo, #?MODULE{smallest = Smallest0, 81 unconfirmed = U0} = State) 82 when is_integer(SeqNo) -> 83 case maps:take(SeqNo, U0) of 84 {{XName, _QS}, U} -> 85 Smallest = case SeqNo of 86 Smallest0 -> 87 %% need to scan as the smallest was removed 88 next_smallest(Smallest0, U); 89 _ -> 90 Smallest0 91 end, 92 {ok, {SeqNo, XName}, State#?MODULE{unconfirmed = U, 93 smallest = Smallest}}; 94 error -> 95 {error, not_found} 96 end. 97 98%% idempotent 99-spec remove_queue(queue_name(), state()) -> 100 {[mx()], state()}. 101remove_queue(QName, #?MODULE{unconfirmed = U} = State) -> 102 SeqNos = maps:fold( 103 fun (SeqNo, {_XName, QS0}, Acc) -> 104 case maps:is_key(QName, QS0) of 105 true -> 106 [SeqNo | Acc]; 107 false -> 108 Acc 109 end 110 end, [], U), 111 confirm(lists:sort(SeqNos), QName,State). 112 113-spec smallest(state()) -> seq_no() | undefined. 114smallest(#?MODULE{smallest = Smallest}) -> 115 Smallest. 116 117-spec size(state()) -> non_neg_integer(). 118size(#?MODULE{unconfirmed = U}) -> 119 maps:size(U). 120 121-spec is_empty(state()) -> boolean(). 122is_empty(State) -> 123 size(State) == 0. 124 125%% INTERNAL 126 127confirm_one(SeqNo, QName, {Acc, U0}) -> 128 case maps:take(SeqNo, U0) of 129 {{XName, QS}, U1} 130 when is_map_key(QName, QS) 131 andalso map_size(QS) == 1 -> 132 %% last queue confirm 133 {[{SeqNo, XName} | Acc], U1}; 134 {{XName, QS}, U1} -> 135 {Acc, U1#{SeqNo => {XName, maps:remove(QName, QS)}}}; 136 error -> 137 {Acc, U0} 138 end. 139 140next_smallest(_S, U) when map_size(U) == 0 -> 141 undefined; 142next_smallest(S, U) when is_map_key(S, U) -> 143 S; 144next_smallest(S, U) -> 145 %% TODO: this is potentially infinitely recursive if called incorrectly 146 next_smallest(S+1, U). 147 148 149 150-ifdef(TEST). 151-include_lib("eunit/include/eunit.hrl"). 152-endif. 153