1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(rabbit_msg_store_gc).
9
10-behaviour(gen_server2).
11
12-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]).
13
14-export([set_maximum_since_use/2]).
15
16-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
17         terminate/2, code_change/3, prioritise_cast/3]).
18
19-record(state,
20        { pending_no_readers,
21          on_action,
22          msg_store_state
23        }).
24
25-include_lib("rabbit_common/include/rabbit.hrl").
26
27%%----------------------------------------------------------------------------
28
29-spec start_link(rabbit_msg_store:gc_state()) ->
30                           rabbit_types:ok_pid_or_error().
31
32start_link(MsgStoreState) ->
33    gen_server2:start_link(?MODULE, [MsgStoreState],
34                           [{timeout, infinity}]).
35
36-spec combine(pid(), rabbit_msg_store:file_num(),
37                    rabbit_msg_store:file_num()) -> 'ok'.
38
39combine(Server, Source, Destination) ->
40    gen_server2:cast(Server, {combine, Source, Destination}).
41
42-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'.
43
44delete(Server, File) ->
45    gen_server2:cast(Server, {delete, File}).
46
47-spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'.
48
49no_readers(Server, File) ->
50    gen_server2:cast(Server, {no_readers, File}).
51
52-spec stop(pid()) -> 'ok'.
53
54stop(Server) ->
55    gen_server2:call(Server, stop, infinity).
56
57-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
58
59set_maximum_since_use(Pid, Age) ->
60    gen_server2:cast(Pid, {set_maximum_since_use, Age}).
61
62%%----------------------------------------------------------------------------
63
64init([MsgStoreState]) ->
65    ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
66                                             [self()]),
67    {ok, #state { pending_no_readers = #{},
68                  on_action          = [],
69                  msg_store_state    = MsgStoreState }, hibernate,
70     {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
71
72prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
73prioritise_cast(_Msg,                          _Len, _State) -> 0.
74
75handle_call(stop, _From, State) ->
76    {stop, normal, ok, State}.
77
78handle_cast({combine, Source, Destination}, State) ->
79    {noreply, attempt_action(combine, [Source, Destination], State), hibernate};
80
81handle_cast({delete, File}, State) ->
82    {noreply, attempt_action(delete, [File], State), hibernate};
83
84handle_cast({no_readers, File},
85            State = #state { pending_no_readers = Pending }) ->
86    {noreply, case maps:find(File, Pending) of
87                  error ->
88                      State;
89                  {ok, {Action, Files}} ->
90                      Pending1 = maps:remove(File, Pending),
91                      attempt_action(
92                        Action, Files,
93                        State #state { pending_no_readers = Pending1 })
94              end, hibernate};
95
96handle_cast({set_maximum_since_use, Age}, State) ->
97    ok = file_handle_cache:set_maximum_since_use(Age),
98    {noreply, State, hibernate}.
99
100handle_info(Info, State) ->
101    {stop, {unhandled_info, Info}, State}.
102
103terminate(_Reason, State) ->
104    State.
105
106code_change(_OldVsn, State, _Extra) ->
107    {ok, State}.
108
109attempt_action(Action, Files,
110               State = #state { pending_no_readers = Pending,
111                                on_action          = Thunks,
112                                msg_store_state    = MsgStoreState }) ->
113    case do_action(Action, Files, MsgStoreState) of
114        {ok, OkThunk} ->
115            State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end,
116                                                 [OkThunk | Thunks])};
117        {defer, [File | _]} ->
118            Pending1 = maps:put(File, {Action, Files}, Pending),
119            State #state { pending_no_readers = Pending1 }
120    end.
121
122do_action(combine, [Source, Destination], MsgStoreState) ->
123    rabbit_msg_store:combine_files(Source, Destination, MsgStoreState);
124do_action(delete, [File], MsgStoreState) ->
125    rabbit_msg_store:delete_file(File, MsgStoreState).
126