1-module(rabbit_fifo_index).
2
3-export([
4         empty/0,
5         exists/2,
6         append/2,
7         delete/2,
8         size/1,
9         smallest/1,
10         map/2
11        ]).
12
13-compile({no_auto_import, [size/1]}).
14
15%% the empty atom is a lot smaller (4 bytes) than e.g. `undefined` (13 bytes).
16%% This matters as the data map gets persisted as part of the snapshot
17-define(NIL, '').
18
19-record(?MODULE, {data = #{} :: #{integer() => ?NIL},
20                  smallest :: undefined | non_neg_integer(),
21                  largest :: undefined | non_neg_integer()
22                 }).
23
24
25-opaque state() :: #?MODULE{}.
26
27-export_type([state/0]).
28
29-spec empty() -> state().
30empty() ->
31    #?MODULE{}.
32
33-spec exists(integer(), state()) -> boolean().
34exists(Key, #?MODULE{data = Data}) ->
35    maps:is_key(Key, Data).
36
37% only integer keys are supported
38-spec append(integer(), state()) -> state().
39append(Key,
40       #?MODULE{data = Data,
41                smallest = Smallest,
42                largest = Largest} = State)
43  when Key > Largest orelse Largest =:= undefined ->
44    State#?MODULE{data = maps:put(Key, ?NIL, Data),
45                  smallest = ra_lib:default(Smallest, Key),
46                  largest = Key}.
47
48-spec delete(Index :: integer(), state()) -> state().
49delete(Smallest, #?MODULE{data = Data0,
50                          largest = Largest,
51                          smallest = Smallest} = State) ->
52    Data = maps:remove(Smallest, Data0),
53    case find_next(Smallest + 1, Largest, Data) of
54        undefined ->
55            State#?MODULE{data = Data,
56                          smallest = undefined,
57                          largest = undefined};
58        Next ->
59            State#?MODULE{data = Data, smallest = Next}
60    end;
61delete(Key, #?MODULE{data = Data} = State) ->
62    State#?MODULE{data = maps:remove(Key, Data)}.
63
64-spec size(state()) -> non_neg_integer().
65size(#?MODULE{data = Data}) ->
66    maps:size(Data).
67
68-spec smallest(state()) -> undefined | integer().
69smallest(#?MODULE{smallest = Smallest}) ->
70    Smallest.
71
72
73-spec map(fun(), state()) -> state().
74map(F, #?MODULE{data = Data} = State) ->
75    State#?MODULE{data = maps:map(F, Data)}.
76
77
78%% internal
79
80find_next(Next, Last, _Map) when Next > Last ->
81    undefined;
82find_next(Next, Last, Map) ->
83    case Map of
84        #{Next := _} ->
85            Next;
86        _ ->
87            % in degenerate cases the range here could be very large
88            % and hence this could be very slow
89            % the typical case should ideally be better
90            % assuming fifo-ish deletion of entries
91            find_next(Next+1, Last, Map)
92    end.
93
94-ifdef(TEST).
95-include_lib("eunit/include/eunit.hrl").
96
97append_test() ->
98    S0 = empty(),
99    false = exists(99, S0),
100    undefined = smallest(S0),
101    0 = size(S0),
102    S1 = append(1, S0),
103    false = exists(99, S1),
104    true = exists(1, S1),
105    1 = size(S1),
106    1 = smallest(S1),
107    S2 = append(2, S1),
108    true = exists(2, S2),
109    2 = size(S2),
110    1 = smallest(S2),
111    S3 = delete(1, S2),
112    2 = smallest(S3),
113    1 = size(S3),
114    S5 = delete(2, S3),
115    undefined = smallest(S5),
116    0 = size(S0),
117    ok.
118
119-endif.
120