1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(credit_flow). 9 10%% Credit flow is controlled by a credit specification - a 11%% {InitialCredit, MoreCreditAfter} tuple. For the message sender, 12%% credit starts at InitialCredit and is decremented with every 13%% message sent. The message receiver grants more credit to the sender 14%% by sending it a {bump_credit, ...} control message after receiving 15%% MoreCreditAfter messages. The sender should pass this message in to 16%% handle_bump_msg/1. The sender should block when it goes below 0 17%% (check by invoking blocked/0). If a process is both a sender and a 18%% receiver it will not grant any more credit to its senders when it 19%% is itself blocked - thus the only processes that need to check 20%% blocked/0 are ones that read from network sockets. 21%% 22%% Credit flows left to right when process send messages down the 23%% chain, starting at the rabbit_reader, ending at the msg_store: 24%% reader -> channel -> queue_process -> msg_store. 25%% 26%% If the message store has a back log, then it will block the 27%% queue_process, which will block the channel, and finally the reader 28%% will be blocked, throttling down publishers. 29%% 30%% Once a process is unblocked, it will grant credits up the chain, 31%% possibly unblocking other processes: 32%% reader <--grant channel <--grant queue_process <--grant msg_store. 33%% 34%% Grepping the project files for `credit_flow` will reveal the places 35%% where this module is currently used, with extra comments on what's 36%% going on at each instance. Note that credit flow between mirrors 37%% synchronization has not been documented, since this doesn't affect 38%% client publishes. 39 40-define(DEFAULT_INITIAL_CREDIT, 200). 41-define(DEFAULT_MORE_CREDIT_AFTER, 100). 42 43-define(DEFAULT_CREDIT, 44 case get(credit_flow_default_credit) of 45 undefined -> 46 Val = rabbit_misc:get_env(rabbit, credit_flow_default_credit, 47 {?DEFAULT_INITIAL_CREDIT, 48 ?DEFAULT_MORE_CREDIT_AFTER}), 49 put(credit_flow_default_credit, Val), 50 Val; 51 Val -> Val 52 end). 53 54-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]). 55-export([peer_down/1]). 56-export([block/1, unblock/1]). 57 58%%---------------------------------------------------------------------------- 59 60-export_type([bump_msg/0]). 61 62-opaque(bump_msg() :: {pid(), non_neg_integer()}). 63-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}). 64 65-spec send 66 (pid()) -> 'ok'; 67 (credit_spec()) -> 'ok'. 68-spec ack(pid()) -> 'ok'. 69-spec ack(pid(), credit_spec()) -> 'ok'. 70-spec handle_bump_msg(bump_msg()) -> 'ok'. 71-spec blocked() -> boolean(). 72-spec peer_down(pid()) -> 'ok'. 73 74%%---------------------------------------------------------------------------- 75 76%% process dict update macro - eliminates the performance-hurting 77%% closure creation a HOF would introduce 78-define(UPDATE(Key, Default, Var, Expr), 79 begin 80 %% We deliberately allow Var to escape from the case here 81 %% to be used in Expr. Any temporary var we introduced 82 %% would also escape, and might conflict. 83 Var = case get(Key) of 84 undefined -> Default; 85 V -> V 86 end, 87 put(Key, Expr) 88 end). 89 90%% If current process was blocked by credit flow in the last 91%% STATE_CHANGE_INTERVAL milliseconds, state/0 will report it as "in 92%% flow". 93-define(STATE_CHANGE_INTERVAL, 1000000). 94 95-ifdef(CREDIT_FLOW_TRACING). 96-define(TRACE_BLOCKED(SELF, FROM), rabbit_event:notify(credit_flow_blocked, 97 [{process, SELF}, 98 {process_info, erlang:process_info(SELF)}, 99 {from, FROM}, 100 {from_info, erlang:process_info(FROM)}, 101 {timestamp, 102 os:system_time( 103 milliseconds)}])). 104-define(TRACE_UNBLOCKED(SELF, FROM), rabbit_event:notify(credit_flow_unblocked, 105 [{process, SELF}, 106 {from, FROM}, 107 {timestamp, 108 os:system_time( 109 milliseconds)}])). 110-else. 111-define(TRACE_BLOCKED(SELF, FROM), ok). 112-define(TRACE_UNBLOCKED(SELF, FROM), ok). 113-endif. 114 115%%---------------------------------------------------------------------------- 116 117%% There are two "flows" here; of messages and of credit, going in 118%% opposite directions. The variable names "From" and "To" refer to 119%% the flow of credit, but the function names refer to the flow of 120%% messages. This is the clearest I can make it (since the function 121%% names form the API and want to make sense externally, while the 122%% variable names are used in credit bookkeeping and want to make 123%% sense internally). 124 125%% For any given pair of processes, ack/2 and send/2 must always be 126%% called with the same credit_spec(). 127 128send(From) -> send(From, ?DEFAULT_CREDIT). 129 130send(From, {InitialCredit, _MoreCreditAfter}) -> 131 ?UPDATE({credit_from, From}, InitialCredit, C, 132 if C == 1 -> block(From), 133 0; 134 true -> C - 1 135 end). 136 137ack(To) -> ack(To, ?DEFAULT_CREDIT). 138 139ack(To, {_InitialCredit, MoreCreditAfter}) -> 140 ?UPDATE({credit_to, To}, MoreCreditAfter, C, 141 if C == 1 -> grant(To, MoreCreditAfter), 142 MoreCreditAfter; 143 true -> C - 1 144 end). 145 146handle_bump_msg({From, MoreCredit}) -> 147 ?UPDATE({credit_from, From}, 0, C, 148 if C =< 0 andalso C + MoreCredit > 0 -> unblock(From), 149 C + MoreCredit; 150 true -> C + MoreCredit 151 end). 152 153blocked() -> case get(credit_blocked) of 154 undefined -> false; 155 [] -> false; 156 _ -> true 157 end. 158 159state() -> case blocked() of 160 true -> flow; 161 false -> case get(credit_blocked_at) of 162 undefined -> running; 163 B -> Now = erlang:monotonic_time(), 164 Diff = erlang:convert_time_unit(Now - B, 165 native, 166 micro_seconds), 167 case Diff < ?STATE_CHANGE_INTERVAL of 168 true -> flow; 169 false -> running 170 end 171 end 172 end. 173 174peer_down(Peer) -> 175 %% In theory we could also remove it from credit_deferred here, but it 176 %% doesn't really matter; at some point later we will drain 177 %% credit_deferred and thus send messages into the void... 178 unblock(Peer), 179 erase({credit_from, Peer}), 180 erase({credit_to, Peer}), 181 ok. 182 183%% -------------------------------------------------------------------------- 184 185grant(To, Quantity) -> 186 Msg = {bump_credit, {self(), Quantity}}, 187 case blocked() of 188 false -> To ! Msg; 189 true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred]) 190 end. 191 192block(From) -> 193 ?TRACE_BLOCKED(self(), From), 194 case blocked() of 195 false -> put(credit_blocked_at, erlang:monotonic_time()); 196 true -> ok 197 end, 198 ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). 199 200unblock(From) -> 201 ?TRACE_UNBLOCKED(self(), From), 202 ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), 203 case blocked() of 204 false -> case erase(credit_deferred) of 205 undefined -> ok; 206 Credits -> _ = [To ! Msg || {To, Msg} <- Credits], 207 ok 208 end; 209 true -> ok 210 end. 211