1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(credit_flow).
9
10%% Credit flow is controlled by a credit specification - a
11%% {InitialCredit, MoreCreditAfter} tuple. For the message sender,
12%% credit starts at InitialCredit and is decremented with every
13%% message sent. The message receiver grants more credit to the sender
14%% by sending it a {bump_credit, ...} control message after receiving
15%% MoreCreditAfter messages. The sender should pass this message in to
16%% handle_bump_msg/1. The sender should block when it goes below 0
17%% (check by invoking blocked/0). If a process is both a sender and a
18%% receiver it will not grant any more credit to its senders when it
19%% is itself blocked - thus the only processes that need to check
20%% blocked/0 are ones that read from network sockets.
21%%
22%% Credit flows left to right when process send messages down the
23%% chain, starting at the rabbit_reader, ending at the msg_store:
24%%  reader -> channel -> queue_process -> msg_store.
25%%
26%% If the message store has a back log, then it will block the
27%% queue_process, which will block the channel, and finally the reader
28%% will be blocked, throttling down publishers.
29%%
30%% Once a process is unblocked, it will grant credits up the chain,
31%% possibly unblocking other processes:
32%% reader <--grant channel <--grant queue_process <--grant msg_store.
33%%
34%% Grepping the project files for `credit_flow` will reveal the places
35%% where this module is currently used, with extra comments on what's
36%% going on at each instance. Note that credit flow between mirrors
37%% synchronization has not been documented, since this doesn't affect
38%% client publishes.
39
40-define(DEFAULT_INITIAL_CREDIT, 200).
41-define(DEFAULT_MORE_CREDIT_AFTER, 100).
42
43-define(DEFAULT_CREDIT,
44        case get(credit_flow_default_credit) of
45            undefined ->
46                Val = rabbit_misc:get_env(rabbit, credit_flow_default_credit,
47                                           {?DEFAULT_INITIAL_CREDIT,
48                                            ?DEFAULT_MORE_CREDIT_AFTER}),
49                put(credit_flow_default_credit, Val),
50                Val;
51            Val       -> Val
52        end).
53
54-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
55-export([peer_down/1]).
56-export([block/1, unblock/1]).
57
58%%----------------------------------------------------------------------------
59
60-export_type([bump_msg/0]).
61
62-opaque(bump_msg() :: {pid(), non_neg_integer()}).
63-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
64
65-spec send
66        (pid()) -> 'ok';
67        (credit_spec()) -> 'ok'.
68-spec ack(pid()) -> 'ok'.
69-spec ack(pid(), credit_spec()) -> 'ok'.
70-spec handle_bump_msg(bump_msg()) -> 'ok'.
71-spec blocked() -> boolean().
72-spec peer_down(pid()) -> 'ok'.
73
74%%----------------------------------------------------------------------------
75
76%% process dict update macro - eliminates the performance-hurting
77%% closure creation a HOF would introduce
78-define(UPDATE(Key, Default, Var, Expr),
79        begin
80            %% We deliberately allow Var to escape from the case here
81            %% to be used in Expr. Any temporary var we introduced
82            %% would also escape, and might conflict.
83            Var = case get(Key) of
84                undefined -> Default;
85                V         -> V
86            end,
87            put(Key, Expr)
88        end).
89
90%% If current process was blocked by credit flow in the last
91%% STATE_CHANGE_INTERVAL milliseconds, state/0 will report it as "in
92%% flow".
93-define(STATE_CHANGE_INTERVAL, 1000000).
94
95-ifdef(CREDIT_FLOW_TRACING).
96-define(TRACE_BLOCKED(SELF, FROM), rabbit_event:notify(credit_flow_blocked,
97                                     [{process, SELF},
98                                      {process_info, erlang:process_info(SELF)},
99                                      {from, FROM},
100                                      {from_info, erlang:process_info(FROM)},
101                                      {timestamp,
102                                       os:system_time(
103                                         milliseconds)}])).
104-define(TRACE_UNBLOCKED(SELF, FROM), rabbit_event:notify(credit_flow_unblocked,
105                                       [{process, SELF},
106                                        {from, FROM},
107                                        {timestamp,
108                                         os:system_time(
109                                           milliseconds)}])).
110-else.
111-define(TRACE_BLOCKED(SELF, FROM), ok).
112-define(TRACE_UNBLOCKED(SELF, FROM), ok).
113-endif.
114
115%%----------------------------------------------------------------------------
116
117%% There are two "flows" here; of messages and of credit, going in
118%% opposite directions. The variable names "From" and "To" refer to
119%% the flow of credit, but the function names refer to the flow of
120%% messages. This is the clearest I can make it (since the function
121%% names form the API and want to make sense externally, while the
122%% variable names are used in credit bookkeeping and want to make
123%% sense internally).
124
125%% For any given pair of processes, ack/2 and send/2 must always be
126%% called with the same credit_spec().
127
128send(From) -> send(From, ?DEFAULT_CREDIT).
129
130send(From, {InitialCredit, _MoreCreditAfter}) ->
131    ?UPDATE({credit_from, From}, InitialCredit, C,
132            if C == 1 -> block(From),
133                         0;
134               true   -> C - 1
135            end).
136
137ack(To) -> ack(To, ?DEFAULT_CREDIT).
138
139ack(To, {_InitialCredit, MoreCreditAfter}) ->
140    ?UPDATE({credit_to, To}, MoreCreditAfter, C,
141            if C == 1 -> grant(To, MoreCreditAfter),
142                         MoreCreditAfter;
143               true   -> C - 1
144            end).
145
146handle_bump_msg({From, MoreCredit}) ->
147    ?UPDATE({credit_from, From}, 0, C,
148            if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
149                                                    C + MoreCredit;
150               true                              -> C + MoreCredit
151            end).
152
153blocked() -> case get(credit_blocked) of
154                 undefined -> false;
155                 []        -> false;
156                 _         -> true
157             end.
158
159state() -> case blocked() of
160               true  -> flow;
161               false -> case get(credit_blocked_at) of
162                            undefined -> running;
163                            B         -> Now = erlang:monotonic_time(),
164                                         Diff = erlang:convert_time_unit(Now - B,
165                                                                              native,
166                                                                              micro_seconds),
167                                         case Diff < ?STATE_CHANGE_INTERVAL of
168                                             true  -> flow;
169                                             false -> running
170                                         end
171                        end
172           end.
173
174peer_down(Peer) ->
175    %% In theory we could also remove it from credit_deferred here, but it
176    %% doesn't really matter; at some point later we will drain
177    %% credit_deferred and thus send messages into the void...
178    unblock(Peer),
179    erase({credit_from, Peer}),
180    erase({credit_to, Peer}),
181    ok.
182
183%% --------------------------------------------------------------------------
184
185grant(To, Quantity) ->
186    Msg = {bump_credit, {self(), Quantity}},
187    case blocked() of
188        false -> To ! Msg;
189        true  -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
190    end.
191
192block(From) ->
193    ?TRACE_BLOCKED(self(), From),
194    case blocked() of
195        false -> put(credit_blocked_at, erlang:monotonic_time());
196        true  -> ok
197    end,
198    ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
199
200unblock(From) ->
201    ?TRACE_UNBLOCKED(self(), From),
202    ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
203    case blocked() of
204        false -> case erase(credit_deferred) of
205                     undefined -> ok;
206                     Credits   -> _ = [To ! Msg || {To, Msg} <- Credits],
207                                  ok
208                 end;
209        true  -> ok
210    end.
211