1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2016-2021 VMware, Inc. or its affiliates. All rights reserved.
6
7-module(rabbit_mgmt_db_cache).
8
9-behaviour(gen_server).
10
11%% API functions
12-export([start_link/1]).
13-export([process_name/1,
14         fetch/2,
15         fetch/3,
16         fetch/4]).
17
18%% gen_server callbacks
19-export([init/1,
20         handle_call/3,
21         handle_cast/2,
22         handle_info/2,
23         terminate/2,
24         code_change/3]).
25
26-record(state, {data :: any() | none,
27                args :: [any()],
28                timer_ref :: undefined | reference(),
29                multiplier :: integer()}).
30
31-type error_desc() :: key_not_found | timeout | {throw, atom()}.
32-type fetch_fun() :: fun((_) -> any()) | fun(() -> any()).
33-type fetch_ret() :: {ok, any()} | {error, error_desc()}.
34
35-define(DEFAULT_MULT, 5).
36-define(DEFAULT_TIMEOUT, 60000).
37-define(CHILD(Key), {rabbit_mgmt_db_cache:process_name(Key),
38                     {rabbit_mgmt_db_cache, start_link, [Key]},
39                                     permanent, 5000, worker,
40                                     [rabbit_mgmt_db_cache]}).
41-define(RESET_STATE(State), State#state{data = none, args = []}).
42
43%% Implements an adaptive cache that times the value generating fun
44%% and uses the return value as the cached value for the time it took
45%% to produce multiplied by some factor (defaults to 5).
46%% There is one cache process per key. New processes are started as
47%% required. The cache is invalidated if the arguments to the fetch
48%% fun have changed.
49
50
51%%%===================================================================
52%%% API functions
53%%%===================================================================
54
55-spec fetch(atom(), fetch_fun()) -> fetch_ret().
56fetch(Key, FetchFun) ->
57    fetch(Key, FetchFun, []).
58
59-spec fetch(atom(), fetch_fun(), [any()]) -> fetch_ret().
60fetch(Key, FetchFun, Args) when is_list(Args) ->
61    fetch(Key, FetchFun, Args, ?DEFAULT_TIMEOUT).
62
63-spec fetch(atom(), fetch_fun(), [any()], integer()) -> fetch_ret().
64fetch(Key, FetchFun, FunArgs, Timeout) ->
65    ProcName = process_name(Key),
66    Pid = case whereis(ProcName) of
67            undefined ->
68                {ok, P} = supervisor:start_child(rabbit_mgmt_db_cache_sup,
69                                                 ?CHILD(Key)),
70                P;
71            P -> P
72          end,
73    gen_server:call(Pid, {fetch, FetchFun, FunArgs}, Timeout).
74
75
76-spec process_name(atom()) -> atom().
77process_name(Key) ->
78    list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(Key)).
79
80-spec start_link(atom()) -> {ok, pid()} | ignore | {error, any()}.
81start_link(Key) ->
82    gen_server:start_link({local, process_name(Key)}, ?MODULE, [], []).
83
84%%%===================================================================
85%%% gen_server callbacks
86%%%===================================================================
87
88init([]) ->
89    Mult = application:get_env(rabbitmq_management, management_db_cache_multiplier,
90                               ?DEFAULT_MULT),
91    {ok, #state{data = none,
92                args = [],
93                multiplier = Mult}}.
94
95handle_call({fetch, _FetchFun, FunArgs} = Msg, From,
96            #state{data = CachedData, args = Args} = State) when
97     CachedData =/= none andalso Args =/= FunArgs ->
98    %% there is cached data that needs to be invalidated
99    handle_call(Msg, From, ?RESET_STATE(State));
100handle_call({fetch, FetchFun, FunArgs}, _From,
101            #state{data = none,
102                   multiplier = Mult, timer_ref = Ref} = State) ->
103    %% force a gc here to clean up previously cleared data
104    garbage_collect(),
105    case Ref of
106        R when is_reference(R) ->
107            _ = erlang:cancel_timer(R);
108        _ -> ok
109    end,
110
111    try timer:tc(FetchFun, FunArgs) of
112        {Time, Data} ->
113            case trunc(Time / 1000 * Mult) of
114                0 -> {reply, {ok, Data}, ?RESET_STATE(State)}; % no need to cache that
115                T ->
116                    TimerRef = erlang:send_after(T, self(), purge_cache),
117                    {reply, {ok, Data}, State#state{data = Data,
118                                                    timer_ref = TimerRef,
119                                                    args = FunArgs}}
120            end
121    catch
122        Throw -> {reply, {error, {throw, Throw}}, State}
123    end;
124handle_call({fetch, _FetchFun, _}, _From, #state{data = Data} = State) ->
125    Reply = {ok, Data},
126    {reply, Reply, State};
127handle_call(purge_cache, _From, State) ->
128    {reply, ok, ?RESET_STATE(State), hibernate}.
129
130
131handle_cast(_Msg, State) ->
132    {noreply, State}.
133
134handle_info(purge_cache, State) ->
135    {noreply, ?RESET_STATE(State), hibernate};
136handle_info(_Info, State) ->
137    {noreply, State}.
138
139terminate(_Reason, _State) ->
140    ok.
141
142code_change(_OldVsn, State, _Extra) ->
143    {ok, State}.
144