1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2016-2021 VMware, Inc. or its affiliates. All rights reserved. 6 7-module(rabbit_mgmt_db_cache). 8 9-behaviour(gen_server). 10 11%% API functions 12-export([start_link/1]). 13-export([process_name/1, 14 fetch/2, 15 fetch/3, 16 fetch/4]). 17 18%% gen_server callbacks 19-export([init/1, 20 handle_call/3, 21 handle_cast/2, 22 handle_info/2, 23 terminate/2, 24 code_change/3]). 25 26-record(state, {data :: any() | none, 27 args :: [any()], 28 timer_ref :: undefined | reference(), 29 multiplier :: integer()}). 30 31-type error_desc() :: key_not_found | timeout | {throw, atom()}. 32-type fetch_fun() :: fun((_) -> any()) | fun(() -> any()). 33-type fetch_ret() :: {ok, any()} | {error, error_desc()}. 34 35-define(DEFAULT_MULT, 5). 36-define(DEFAULT_TIMEOUT, 60000). 37-define(CHILD(Key), {rabbit_mgmt_db_cache:process_name(Key), 38 {rabbit_mgmt_db_cache, start_link, [Key]}, 39 permanent, 5000, worker, 40 [rabbit_mgmt_db_cache]}). 41-define(RESET_STATE(State), State#state{data = none, args = []}). 42 43%% Implements an adaptive cache that times the value generating fun 44%% and uses the return value as the cached value for the time it took 45%% to produce multiplied by some factor (defaults to 5). 46%% There is one cache process per key. New processes are started as 47%% required. The cache is invalidated if the arguments to the fetch 48%% fun have changed. 49 50 51%%%=================================================================== 52%%% API functions 53%%%=================================================================== 54 55-spec fetch(atom(), fetch_fun()) -> fetch_ret(). 56fetch(Key, FetchFun) -> 57 fetch(Key, FetchFun, []). 58 59-spec fetch(atom(), fetch_fun(), [any()]) -> fetch_ret(). 60fetch(Key, FetchFun, Args) when is_list(Args) -> 61 fetch(Key, FetchFun, Args, ?DEFAULT_TIMEOUT). 62 63-spec fetch(atom(), fetch_fun(), [any()], integer()) -> fetch_ret(). 64fetch(Key, FetchFun, FunArgs, Timeout) -> 65 ProcName = process_name(Key), 66 Pid = case whereis(ProcName) of 67 undefined -> 68 {ok, P} = supervisor:start_child(rabbit_mgmt_db_cache_sup, 69 ?CHILD(Key)), 70 P; 71 P -> P 72 end, 73 gen_server:call(Pid, {fetch, FetchFun, FunArgs}, Timeout). 74 75 76-spec process_name(atom()) -> atom(). 77process_name(Key) -> 78 list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(Key)). 79 80-spec start_link(atom()) -> {ok, pid()} | ignore | {error, any()}. 81start_link(Key) -> 82 gen_server:start_link({local, process_name(Key)}, ?MODULE, [], []). 83 84%%%=================================================================== 85%%% gen_server callbacks 86%%%=================================================================== 87 88init([]) -> 89 Mult = application:get_env(rabbitmq_management, management_db_cache_multiplier, 90 ?DEFAULT_MULT), 91 {ok, #state{data = none, 92 args = [], 93 multiplier = Mult}}. 94 95handle_call({fetch, _FetchFun, FunArgs} = Msg, From, 96 #state{data = CachedData, args = Args} = State) when 97 CachedData =/= none andalso Args =/= FunArgs -> 98 %% there is cached data that needs to be invalidated 99 handle_call(Msg, From, ?RESET_STATE(State)); 100handle_call({fetch, FetchFun, FunArgs}, _From, 101 #state{data = none, 102 multiplier = Mult, timer_ref = Ref} = State) -> 103 %% force a gc here to clean up previously cleared data 104 garbage_collect(), 105 case Ref of 106 R when is_reference(R) -> 107 _ = erlang:cancel_timer(R); 108 _ -> ok 109 end, 110 111 try timer:tc(FetchFun, FunArgs) of 112 {Time, Data} -> 113 case trunc(Time / 1000 * Mult) of 114 0 -> {reply, {ok, Data}, ?RESET_STATE(State)}; % no need to cache that 115 T -> 116 TimerRef = erlang:send_after(T, self(), purge_cache), 117 {reply, {ok, Data}, State#state{data = Data, 118 timer_ref = TimerRef, 119 args = FunArgs}} 120 end 121 catch 122 Throw -> {reply, {error, {throw, Throw}}, State} 123 end; 124handle_call({fetch, _FetchFun, _}, _From, #state{data = Data} = State) -> 125 Reply = {ok, Data}, 126 {reply, Reply, State}; 127handle_call(purge_cache, _From, State) -> 128 {reply, ok, ?RESET_STATE(State), hibernate}. 129 130 131handle_cast(_Msg, State) -> 132 {noreply, State}. 133 134handle_info(purge_cache, State) -> 135 {noreply, ?RESET_STATE(State), hibernate}; 136handle_info(_Info, State) -> 137 {noreply, State}. 138 139terminate(_Reason, _State) -> 140 ok. 141 142code_change(_OldVsn, State, _Extra) -> 143 {ok, State}. 144