1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(rabbit_msg_store_gc). 9 10-behaviour(gen_server2). 11 12-export([start_link/1, combine/3, delete/2, no_readers/2, stop/1]). 13 14-export([set_maximum_since_use/2]). 15 16-export([init/1, handle_call/3, handle_cast/2, handle_info/2, 17 terminate/2, code_change/3, prioritise_cast/3]). 18 19-record(state, 20 { pending_no_readers, 21 on_action, 22 msg_store_state 23 }). 24 25-include_lib("rabbit_common/include/rabbit.hrl"). 26 27%%---------------------------------------------------------------------------- 28 29-spec start_link(rabbit_msg_store:gc_state()) -> 30 rabbit_types:ok_pid_or_error(). 31 32start_link(MsgStoreState) -> 33 gen_server2:start_link(?MODULE, [MsgStoreState], 34 [{timeout, infinity}]). 35 36-spec combine(pid(), rabbit_msg_store:file_num(), 37 rabbit_msg_store:file_num()) -> 'ok'. 38 39combine(Server, Source, Destination) -> 40 gen_server2:cast(Server, {combine, Source, Destination}). 41 42-spec delete(pid(), rabbit_msg_store:file_num()) -> 'ok'. 43 44delete(Server, File) -> 45 gen_server2:cast(Server, {delete, File}). 46 47-spec no_readers(pid(), rabbit_msg_store:file_num()) -> 'ok'. 48 49no_readers(Server, File) -> 50 gen_server2:cast(Server, {no_readers, File}). 51 52-spec stop(pid()) -> 'ok'. 53 54stop(Server) -> 55 gen_server2:call(Server, stop, infinity). 56 57-spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. 58 59set_maximum_since_use(Pid, Age) -> 60 gen_server2:cast(Pid, {set_maximum_since_use, Age}). 61 62%%---------------------------------------------------------------------------- 63 64init([MsgStoreState]) -> 65 ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, 66 [self()]), 67 {ok, #state { pending_no_readers = #{}, 68 on_action = [], 69 msg_store_state = MsgStoreState }, hibernate, 70 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. 71 72prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; 73prioritise_cast(_Msg, _Len, _State) -> 0. 74 75handle_call(stop, _From, State) -> 76 {stop, normal, ok, State}. 77 78handle_cast({combine, Source, Destination}, State) -> 79 {noreply, attempt_action(combine, [Source, Destination], State), hibernate}; 80 81handle_cast({delete, File}, State) -> 82 {noreply, attempt_action(delete, [File], State), hibernate}; 83 84handle_cast({no_readers, File}, 85 State = #state { pending_no_readers = Pending }) -> 86 {noreply, case maps:find(File, Pending) of 87 error -> 88 State; 89 {ok, {Action, Files}} -> 90 Pending1 = maps:remove(File, Pending), 91 attempt_action( 92 Action, Files, 93 State #state { pending_no_readers = Pending1 }) 94 end, hibernate}; 95 96handle_cast({set_maximum_since_use, Age}, State) -> 97 ok = file_handle_cache:set_maximum_since_use(Age), 98 {noreply, State, hibernate}. 99 100handle_info(Info, State) -> 101 {stop, {unhandled_info, Info}, State}. 102 103terminate(_Reason, State) -> 104 State. 105 106code_change(_OldVsn, State, _Extra) -> 107 {ok, State}. 108 109attempt_action(Action, Files, 110 State = #state { pending_no_readers = Pending, 111 on_action = Thunks, 112 msg_store_state = MsgStoreState }) -> 113 case do_action(Action, Files, MsgStoreState) of 114 {ok, OkThunk} -> 115 State#state{on_action = lists:filter(fun (Thunk) -> not Thunk() end, 116 [OkThunk | Thunks])}; 117 {defer, [File | _]} -> 118 Pending1 = maps:put(File, {Action, Files}, Pending), 119 State #state { pending_no_readers = Pending1 } 120 end. 121 122do_action(combine, [Source, Destination], MsgStoreState) -> 123 rabbit_msg_store:combine_files(Source, Destination, MsgStoreState); 124do_action(delete, [File], MsgStoreState) -> 125 rabbit_msg_store:delete_file(File, MsgStoreState). 126