1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8%% @private
9-module(amqp_channels_manager).
10
11-include("amqp_client_internal.hrl").
12
13-behaviour(gen_server).
14
15-export([start_link/3, open_channel/4, set_channel_max/2, is_empty/1,
16         num_channels/1, pass_frame/3, signal_connection_closing/3,
17         process_channel_frame/4]).
18-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
19         handle_info/2]).
20
21-record(state, {connection,
22                channel_sup_sup,
23                map_num_pa      = gb_trees:empty(), %% Number -> {Pid, AState}
24                map_pid_num     = #{},       %% Pid -> Number
25                channel_max     = ?MAX_CHANNEL_NUMBER,
26                closing         = false}).
27
28%%---------------------------------------------------------------------------
29%% Interface
30%%---------------------------------------------------------------------------
31
32start_link(Connection, ConnName, ChSupSup) ->
33    gen_server:start_link(?MODULE, [Connection, ConnName, ChSupSup], []).
34
35open_channel(ChMgr, ProposedNumber, Consumer, InfraArgs) ->
36    gen_server:call(ChMgr, {open_channel, ProposedNumber, Consumer, InfraArgs},
37                     amqp_util:call_timeout()).
38
39set_channel_max(ChMgr, ChannelMax) ->
40    gen_server:cast(ChMgr, {set_channel_max, ChannelMax}).
41
42is_empty(ChMgr) ->
43    gen_server:call(ChMgr, is_empty, amqp_util:call_timeout()).
44
45num_channels(ChMgr) ->
46    gen_server:call(ChMgr, num_channels, amqp_util:call_timeout()).
47
48pass_frame(ChMgr, ChNumber, Frame) ->
49    gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}).
50
51signal_connection_closing(ChMgr, ChannelCloseType, Reason) ->
52    gen_server:cast(ChMgr, {connection_closing, ChannelCloseType, Reason}).
53
54process_channel_frame(Frame, Channel, ChPid, AState) ->
55    case rabbit_command_assembler:process(Frame, AState) of
56        {ok, NewAState}                  -> NewAState;
57        {ok, Method, NewAState}          -> rabbit_channel_common:do(ChPid, Method),
58                                            NewAState;
59        {ok, Method, Content, NewAState} -> rabbit_channel_common:do(ChPid, Method,
60                                                                     Content),
61                                            NewAState;
62        {error, Reason}                  -> ChPid ! {channel_exit, Channel,
63                                                     Reason},
64                                            AState
65    end.
66
67%%---------------------------------------------------------------------------
68%% gen_server callbacks
69%%---------------------------------------------------------------------------
70
71init([Connection, ConnName, ChSupSup]) ->
72    ?store_proc_name(ConnName),
73    {ok, #state{connection = Connection, channel_sup_sup = ChSupSup}}.
74
75terminate(_Reason, _State) ->
76    ok.
77
78code_change(_OldVsn, State, _Extra) ->
79    {ok, State}.
80
81handle_call({open_channel, ProposedNumber, Consumer, InfraArgs}, _,
82            State = #state{closing = false}) ->
83    handle_open_channel(ProposedNumber, Consumer, InfraArgs, State);
84handle_call(is_empty, _, State) ->
85    {reply, internal_is_empty(State), State};
86handle_call(num_channels, _, State) ->
87    {reply, internal_num_channels(State), State}.
88
89handle_cast({set_channel_max, ChannelMax}, State) ->
90    {noreply, State#state{channel_max = ChannelMax}};
91handle_cast({pass_frame, ChNumber, Frame}, State) ->
92    {noreply, internal_pass_frame(ChNumber, Frame, State)};
93handle_cast({connection_closing, ChannelCloseType, Reason}, State) ->
94    handle_connection_closing(ChannelCloseType, Reason, State).
95
96handle_info({'DOWN', _, process, Pid, Reason}, State) ->
97    handle_down(Pid, Reason, State).
98
99%%---------------------------------------------------------------------------
100%% Internal plumbing
101%%---------------------------------------------------------------------------
102
103handle_open_channel(ProposedNumber, Consumer, InfraArgs,
104                    State = #state{channel_sup_sup = ChSupSup}) ->
105    case new_number(ProposedNumber, State) of
106        {ok, Number} ->
107            {ok, _ChSup, {Ch, AState}} =
108                amqp_channel_sup_sup:start_channel_sup(ChSupSup, InfraArgs,
109                                                       Number, Consumer),
110            NewState = internal_register(Number, Ch, AState, State),
111            erlang:monitor(process, Ch),
112            {reply, {ok, Ch}, NewState};
113        {error, _} = Error ->
114            {reply, Error, State}
115    end.
116
117new_number(none, #state{channel_max = ChannelMax, map_num_pa = MapNPA}) ->
118    case gb_trees:is_empty(MapNPA) of
119        true  -> {ok, 1};
120        false -> {Smallest, _} = gb_trees:smallest(MapNPA),
121                 if Smallest > 1 ->
122                        {ok, Smallest - 1};
123                    true ->
124                        {Largest, _} = gb_trees:largest(MapNPA),
125                        if Largest < ChannelMax -> {ok, Largest + 1};
126                           true                 -> find_free(MapNPA)
127                        end
128                 end
129    end;
130new_number(Proposed, State = #state{channel_max = ChannelMax,
131                                    map_num_pa  = MapNPA}) ->
132    IsValid = Proposed > 0 andalso Proposed =< ChannelMax andalso
133        not gb_trees:is_defined(Proposed, MapNPA),
134    case IsValid of true  -> {ok, Proposed};
135                    false -> new_number(none, State)
136    end.
137
138find_free(MapNPA) ->
139    find_free(gb_trees:iterator(MapNPA), 1).
140
141find_free(It, Candidate) ->
142    case gb_trees:next(It) of
143        {Number, _, It1} -> if Number > Candidate ->
144                                   {ok, Number - 1};
145                               Number =:= Candidate ->
146                                   find_free(It1, Candidate + 1)
147                            end;
148        none             -> {error, out_of_channel_numbers}
149    end.
150
151handle_down(Pid, Reason, State) ->
152    case internal_lookup_pn(Pid, State) of
153        undefined -> {stop, {error, unexpected_down}, State};
154        Number    -> handle_channel_down(Pid, Number, Reason, State)
155    end.
156
157handle_channel_down(Pid, Number, Reason, State) ->
158    maybe_report_down(Pid, case Reason of {shutdown, R} -> R;
159                                          _             -> Reason
160                           end,
161                      State),
162    NewState = internal_unregister(Number, Pid, State),
163    check_all_channels_terminated(NewState),
164    {noreply, NewState}.
165
166maybe_report_down(_Pid, normal, _State) ->
167    ok;
168maybe_report_down(_Pid, shutdown, _State) ->
169    ok;
170maybe_report_down(_Pid, {app_initiated_close, _, _}, _State) ->
171    ok;
172maybe_report_down(_Pid, {server_initiated_close, _, _}, _State) ->
173    ok;
174maybe_report_down(_Pid, {connection_closing, _}, _State) ->
175    ok;
176maybe_report_down(_Pid, {server_misbehaved, AmqpError},
177                  #state{connection = Connection}) ->
178    amqp_gen_connection:server_misbehaved(Connection, AmqpError);
179maybe_report_down(Pid, Other, #state{connection = Connection}) ->
180    amqp_gen_connection:channel_internal_error(Connection, Pid, Other).
181
182check_all_channels_terminated(#state{closing = false}) ->
183    ok;
184check_all_channels_terminated(State = #state{closing = true,
185                                             connection = Connection}) ->
186    case internal_is_empty(State) of
187        true  -> amqp_gen_connection:channels_terminated(Connection);
188        false -> ok
189    end.
190
191handle_connection_closing(ChannelCloseType, Reason,
192                          State = #state{connection = Connection}) ->
193    case internal_is_empty(State) of
194        true  -> amqp_gen_connection:channels_terminated(Connection);
195        false -> signal_channels_connection_closing(ChannelCloseType, Reason,
196                                                    State)
197    end,
198    {noreply, State#state{closing = true}}.
199
200%%---------------------------------------------------------------------------
201
202internal_pass_frame(Number, Frame, State) ->
203    case internal_lookup_npa(Number, State) of
204        undefined ->
205            ?LOG_INFO("Dropping frame ~p for invalid or closed "
206                      "channel number ~p", [Frame, Number]),
207            State;
208        {ChPid, AState} ->
209            NewAState = process_channel_frame(Frame, Number, ChPid, AState),
210            internal_update_npa(Number, ChPid, NewAState, State)
211    end.
212
213internal_register(Number, Pid, AState,
214                  State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
215    MapNPA1 = gb_trees:enter(Number, {Pid, AState}, MapNPA),
216    MapPN1 = maps:put(Pid, Number, MapPN),
217    State#state{map_num_pa  = MapNPA1,
218                map_pid_num = MapPN1}.
219
220internal_unregister(Number, Pid,
221                    State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
222    MapNPA1 = gb_trees:delete(Number, MapNPA),
223    MapPN1 = maps:remove(Pid, MapPN),
224    State#state{map_num_pa  = MapNPA1,
225                map_pid_num = MapPN1}.
226
227internal_is_empty(#state{map_num_pa = MapNPA}) ->
228    gb_trees:is_empty(MapNPA).
229
230internal_num_channels(#state{map_num_pa = MapNPA}) ->
231    gb_trees:size(MapNPA).
232
233internal_lookup_npa(Number, #state{map_num_pa = MapNPA}) ->
234    case gb_trees:lookup(Number, MapNPA) of {value, PA} -> PA;
235                                            none        -> undefined
236    end.
237
238internal_lookup_pn(Pid, #state{map_pid_num = MapPN}) ->
239    case maps:find(Pid, MapPN) of {ok, Number} -> Number;
240                                  error        -> undefined
241    end.
242
243internal_update_npa(Number, Pid, AState, State = #state{map_num_pa = MapNPA}) ->
244    State#state{map_num_pa = gb_trees:update(Number, {Pid, AState}, MapNPA)}.
245
246signal_channels_connection_closing(ChannelCloseType, Reason,
247                                   #state{map_pid_num = MapPN}) ->
248    [amqp_channel:connection_closing(Pid, ChannelCloseType, Reason)
249        || Pid <- maps:keys(MapPN)].
250